{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Use a federated learning strategy\n",
    "\n",
    "Welcome to the next part of the federated learning tutorial. In previous parts of this tutorial, we introduced federated learning with PyTorch and Flower ([part 1](https://flower.ai/docs/framework/tutorial-get-started-with-flower-pytorch.html)).\n",
    "\n",
    "In this notebook, we'll begin to customize the federated learning system we built in the introductory notebook again, using the Flower framework, Flower Datasets, and PyTorch.\n",
    "\n",
    "> [Star Flower on GitHub](https://github.com/adap/flower) ⭐️ and join the Flower community on Flower Discuss and the Flower Slack to connect, ask questions, and get help:\n",
    "> - [Join Flower Discuss](https://discuss.flower.ai/) We'd love to hear from you in the `Introduction` topic! If anything is unclear, post in `Flower Help - Beginners`.\n",
    "> - [Join Flower Slack](https://flower.ai/join-slack) We'd love to hear from you in the `#introductions` channel! If anything is unclear, head over to the `#questions` channel.\n",
    "\n",
    "Let's move beyond FedAvg with Flower strategies! 🌼"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Preparation\n",
    "\n",
    "Before we begin with the actual code, let's make sure that we have everything we need."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Installing dependencies\n",
    "\n",
    "First, we install the necessary packages:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install -q flwr[simulation] flwr-datasets[vision] torch torchvision"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have all dependencies installed, we can import everything we need for this tutorial:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from collections import OrderedDict\n",
    "from typing import Dict, List, Optional, Tuple\n",
    "\n",
    "import numpy as np\n",
    "import torch\n",
    "import torch.nn as nn\n",
    "import torch.nn.functional as F\n",
    "import torchvision.transforms as transforms\n",
    "from torch.utils.data import DataLoader\n",
    "\n",
    "import flwr\n",
    "from flwr.client import Client, ClientApp, NumPyClient\n",
    "from flwr.server import ServerApp, ServerConfig, ServerAppComponents\n",
    "from flwr.server.strategy import FedAvg, FedAdagrad\n",
    "from flwr.simulation import run_simulation\n",
    "from flwr_datasets import FederatedDataset\n",
    "from flwr.common import ndarrays_to_parameters, NDArrays, Scalar, Context\n",
    "\n",
    "DEVICE = torch.device(\"cpu\")  # Try \"cuda\" to train on GPU\n",
    "print(f\"Training on {DEVICE}\")\n",
    "print(f\"Flower {flwr.__version__} / PyTorch {torch.__version__}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It is possible to switch to a runtime that has GPU acceleration enabled (on Google Colab: `Runtime > Change runtime type > Hardware acclerator: GPU > Save`). Note, however, that Google Colab is not always able to offer GPU acceleration. If you see an error related to GPU availability in one of the following sections, consider switching back to CPU-based execution by setting `DEVICE = torch.device(\"cpu\")`. If the runtime has GPU acceleration enabled, you should see the output `Training on cuda`, otherwise it'll say `Training on cpu`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Data loading\n",
    "\n",
    "Let's now load the CIFAR-10 training and test set, partition them into ten smaller datasets (each split into training and validation set), and wrap everything in their own `DataLoader`. We introduce a new parameter `num_partitions` which allows us to call `load_datasets` with different numbers of partitions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "NUM_PARTITIONS = 10\n",
    "BATCH_SIZE = 32\n",
    "\n",
    "\n",
    "def load_datasets(partition_id: int, num_partitions: int):\n",
    "    fds = FederatedDataset(dataset=\"cifar10\", partitioners={\"train\": num_partitions})\n",
    "    partition = fds.load_partition(partition_id)\n",
    "    # Divide data on each node: 80% train, 20% test\n",
    "    partition_train_test = partition.train_test_split(test_size=0.2, seed=42)\n",
    "    pytorch_transforms = transforms.Compose(\n",
    "        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]\n",
    "    )\n",
    "\n",
    "    def apply_transforms(batch):\n",
    "        # Instead of passing transforms to CIFAR10(..., transform=transform)\n",
    "        # we will use this function to dataset.with_transform(apply_transforms)\n",
    "        # The transforms object is exactly the same\n",
    "        batch[\"img\"] = [pytorch_transforms(img) for img in batch[\"img\"]]\n",
    "        return batch\n",
    "\n",
    "    partition_train_test = partition_train_test.with_transform(apply_transforms)\n",
    "    trainloader = DataLoader(\n",
    "        partition_train_test[\"train\"], batch_size=BATCH_SIZE, shuffle=True\n",
    "    )\n",
    "    valloader = DataLoader(partition_train_test[\"test\"], batch_size=BATCH_SIZE)\n",
    "    testset = fds.load_split(\"test\").with_transform(apply_transforms)\n",
    "    testloader = DataLoader(testset, batch_size=BATCH_SIZE)\n",
    "    return trainloader, valloader, testloader"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Model training/evaluation\n",
    "\n",
    "Let's continue with the usual model definition (including `set_parameters` and `get_parameters`), training and test functions:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class Net(nn.Module):\n",
    "    def __init__(self) -> None:\n",
    "        super(Net, self).__init__()\n",
    "        self.conv1 = nn.Conv2d(3, 6, 5)\n",
    "        self.pool = nn.MaxPool2d(2, 2)\n",
    "        self.conv2 = nn.Conv2d(6, 16, 5)\n",
    "        self.fc1 = nn.Linear(16 * 5 * 5, 120)\n",
    "        self.fc2 = nn.Linear(120, 84)\n",
    "        self.fc3 = nn.Linear(84, 10)\n",
    "\n",
    "    def forward(self, x: torch.Tensor) -> torch.Tensor:\n",
    "        x = self.pool(F.relu(self.conv1(x)))\n",
    "        x = self.pool(F.relu(self.conv2(x)))\n",
    "        x = x.view(-1, 16 * 5 * 5)\n",
    "        x = F.relu(self.fc1(x))\n",
    "        x = F.relu(self.fc2(x))\n",
    "        x = self.fc3(x)\n",
    "        return x\n",
    "\n",
    "\n",
    "def get_parameters(net) -> List[np.ndarray]:\n",
    "    return [val.cpu().numpy() for _, val in net.state_dict().items()]\n",
    "\n",
    "\n",
    "def set_parameters(net, parameters: List[np.ndarray]):\n",
    "    params_dict = zip(net.state_dict().keys(), parameters)\n",
    "    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})\n",
    "    net.load_state_dict(state_dict, strict=True)\n",
    "\n",
    "\n",
    "def train(net, trainloader, epochs: int):\n",
    "    \"\"\"Train the network on the training set.\"\"\"\n",
    "    criterion = torch.nn.CrossEntropyLoss()\n",
    "    optimizer = torch.optim.Adam(net.parameters())\n",
    "    net.train()\n",
    "    for epoch in range(epochs):\n",
    "        correct, total, epoch_loss = 0, 0, 0.0\n",
    "        for batch in trainloader:\n",
    "            images, labels = batch[\"img\"], batch[\"label\"]\n",
    "            images, labels = images.to(DEVICE), labels.to(DEVICE)\n",
    "            optimizer.zero_grad()\n",
    "            outputs = net(images)\n",
    "            loss = criterion(net(images), labels)\n",
    "            loss.backward()\n",
    "            optimizer.step()\n",
    "            # Metrics\n",
    "            epoch_loss += loss\n",
    "            total += labels.size(0)\n",
    "            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()\n",
    "        epoch_loss /= len(trainloader.dataset)\n",
    "        epoch_acc = correct / total\n",
    "        print(f\"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}\")\n",
    "\n",
    "\n",
    "def test(net, testloader):\n",
    "    \"\"\"Evaluate the network on the entire test set.\"\"\"\n",
    "    criterion = torch.nn.CrossEntropyLoss()\n",
    "    correct, total, loss = 0, 0, 0.0\n",
    "    net.eval()\n",
    "    with torch.no_grad():\n",
    "        for batch in testloader:\n",
    "            images, labels = batch[\"img\"], batch[\"label\"]\n",
    "            images, labels = images.to(DEVICE), labels.to(DEVICE)\n",
    "            outputs = net(images)\n",
    "            loss += criterion(outputs, labels).item()\n",
    "            _, predicted = torch.max(outputs.data, 1)\n",
    "            total += labels.size(0)\n",
    "            correct += (predicted == labels).sum().item()\n",
    "    loss /= len(testloader.dataset)\n",
    "    accuracy = correct / total\n",
    "    return loss, accuracy"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Flower client\n",
    "\n",
    "To implement the Flower client, we (again) create a subclass of `flwr.client.NumPyClient` and implement the three methods `get_parameters`, `fit`, and `evaluate`. Here, we also pass the `partition_id` to the client and use it log additional details. We then create an instance of `ClientApp` and pass it the `client_fn`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class FlowerClient(NumPyClient):\n",
    "    def __init__(self, partition_id, net, trainloader, valloader):\n",
    "        self.partition_id = partition_id\n",
    "        self.net = net\n",
    "        self.trainloader = trainloader\n",
    "        self.valloader = valloader\n",
    "\n",
    "    def get_parameters(self, config):\n",
    "        print(f\"[Client {self.partition_id}] get_parameters\")\n",
    "        return get_parameters(self.net)\n",
    "\n",
    "    def fit(self, parameters, config):\n",
    "        print(f\"[Client {self.partition_id}] fit, config: {config}\")\n",
    "        set_parameters(self.net, parameters)\n",
    "        train(self.net, self.trainloader, epochs=1)\n",
    "        return get_parameters(self.net), len(self.trainloader), {}\n",
    "\n",
    "    def evaluate(self, parameters, config):\n",
    "        print(f\"[Client {self.partition_id}] evaluate, config: {config}\")\n",
    "        set_parameters(self.net, parameters)\n",
    "        loss, accuracy = test(self.net, self.valloader)\n",
    "        return float(loss), len(self.valloader), {\"accuracy\": float(accuracy)}\n",
    "\n",
    "\n",
    "def client_fn(context: Context) -> Client:\n",
    "    net = Net().to(DEVICE)\n",
    "\n",
    "    # Read the node_config to fetch data partition associated to this node\n",
    "    partition_id = context.node_config[\"partition-id\"]\n",
    "    num_partitions = context.node_config[\"num-partitions\"]\n",
    "\n",
    "    trainloader, valloader, _ = load_datasets(partition_id, num_partitions)\n",
    "    return FlowerClient(partition_id, net, trainloader, valloader).to_client()\n",
    "\n",
    "\n",
    "# Create the ClientApp\n",
    "client = ClientApp(client_fn=client_fn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Strategy customization\n",
    "\n",
    "So far, everything should look familiar if you've worked through the introductory notebook. With that, we're ready to introduce a number of new features. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Server-side parameter **initialization**\n",
    "\n",
    "Flower, by default, initializes the global model by asking one random client for the initial parameters. In many cases, we want more control over parameter initialization though. Flower therefore allows you to directly pass the initial parameters to the Strategy. We create an instance of `Net()` and get the paramaters as follows:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create an instance of the model and get the parameters\n",
    "params = get_parameters(Net())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we create a `server_fn` that returns the components needed for the server. Within `server_fn`, we create a Strategy that uses the initial parameters."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def server_fn(context: Context) -> ServerAppComponents:\n",
    "    # Create FedAvg strategy\n",
    "    strategy = FedAvg(\n",
    "        fraction_fit=0.3,\n",
    "        fraction_evaluate=0.3,\n",
    "        min_fit_clients=3,\n",
    "        min_evaluate_clients=3,\n",
    "        min_available_clients=NUM_PARTITIONS,\n",
    "        initial_parameters=ndarrays_to_parameters(\n",
    "            params\n",
    "        ),  # Pass initial model parameters\n",
    "    )\n",
    "\n",
    "    # Configure the server for 3 rounds of training\n",
    "    config = ServerConfig(num_rounds=3)\n",
    "    return ServerAppComponents(strategy=strategy, config=config)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Passing `initial_parameters` to the `FedAvg` strategy prevents Flower from asking one of the clients for the initial parameters. In `server_fn`, we pass this new `strategy` and a `ServerConfig` for defining the number of federated learning rounds (`num_rounds`). \n",
    "\n",
    "Similar to the `ClientApp`, we now create the `ServerApp` using the `server_fn`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create ServerApp\n",
    "server = ServerApp(server_fn=server_fn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Last but not least, we specify the resources for each client and run the simulation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Specify the resources each of your clients need\n",
    "# If set to none, by default, each client will be allocated 2x CPU and 0x GPUs\n",
    "backend_config = {\"client_resources\": None}\n",
    "if DEVICE.type == \"cuda\":\n",
    "    backend_config = {\"client_resources\": {\"num_gpus\": 1}}\n",
    "\n",
    "# Run simulation\n",
    "run_simulation(\n",
    "    server_app=server,\n",
    "    client_app=client,\n",
    "    num_supernodes=NUM_PARTITIONS,\n",
    "    backend_config=backend_config,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " If we look closely, we can see that the logs do not show any calls to the `FlowerClient.get_parameters` method."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Starting with a customized strategy\n",
    "\n",
    "We've seen the function `run_simulation` before. It accepts a number of arguments, amongst them the `server_app` which wraps around the strategy and number of training rounds, `client_app` which wraps around the `client_fn` used to create `FlowerClient` instances, and the number of clients to simulate which equals `num_supernodes`.\n",
    "\n",
    "The strategy encapsulates the federated learning approach/algorithm, for example, `FedAvg` or `FedAdagrad`. Let's try to use a different strategy this time:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def server_fn(context: Context) -> ServerAppComponents:\n",
    "    # Create FedAdagrad strategy\n",
    "    strategy = FedAdagrad(\n",
    "        fraction_fit=0.3,\n",
    "        fraction_evaluate=0.3,\n",
    "        min_fit_clients=3,\n",
    "        min_evaluate_clients=3,\n",
    "        min_available_clients=NUM_PARTITIONS,\n",
    "        initial_parameters=ndarrays_to_parameters(params),\n",
    "    )\n",
    "    # Configure the server for 3 rounds of training\n",
    "    config = ServerConfig(num_rounds=3)\n",
    "    return ServerAppComponents(strategy=strategy, config=config)\n",
    "\n",
    "\n",
    "# Create the ServerApp\n",
    "server = ServerApp(server_fn=server_fn)\n",
    "\n",
    "# Run simulation\n",
    "run_simulation(\n",
    "    server_app=server,\n",
    "    client_app=client,\n",
    "    num_supernodes=NUM_PARTITIONS,\n",
    "    backend_config=backend_config,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Server-side parameter **evaluation**\n",
    "\n",
    "Flower can evaluate the aggregated model on the server-side or on the client-side. Client-side and server-side evaluation are similar in some ways, but different in others.\n",
    "\n",
    "**Centralized Evaluation** (or *server-side evaluation*) is conceptually simple: it works the same way that evaluation in centralized machine learning does. If there is a server-side dataset that can be used for evaluation purposes, then that's great. We can evaluate the newly aggregated model after each round of training without having to send the model to clients. We're also fortunate in the sense that our entire evaluation dataset is available at all times.\n",
    "\n",
    "**Federated Evaluation** (or *client-side evaluation*) is more complex, but also more powerful: it doesn't require a centralized dataset and allows us to evaluate models over a larger set of data, which often yields more realistic evaluation results. In fact, many scenarios require us to use **Federated Evaluation** if we want to get representative evaluation results at all. But this power comes at a cost: once we start to evaluate on the client side, we should be aware that our evaluation dataset can change over consecutive rounds of learning if those clients are not always available. Moreover, the dataset held by each client can also change over consecutive rounds. This can lead to evaluation results that are not stable, so even if we would not change the model, we'd see our evaluation results fluctuate over consecutive rounds.\n",
    "\n",
    "We've seen how federated evaluation works on the client side (i.e., by implementing the `evaluate` method in `FlowerClient`). Now let's see how we can evaluate aggregated model parameters on the server-side:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# The `evaluate` function will be called by Flower after every round\n",
    "def evaluate(\n",
    "    server_round: int,\n",
    "    parameters: NDArrays,\n",
    "    config: Dict[str, Scalar],\n",
    ") -> Optional[Tuple[float, Dict[str, Scalar]]]:\n",
    "    net = Net().to(DEVICE)\n",
    "    _, _, testloader = load_datasets(0, NUM_PARTITIONS)\n",
    "    set_parameters(net, parameters)  # Update model with the latest parameters\n",
    "    loss, accuracy = test(net, testloader)\n",
    "    print(f\"Server-side evaluation loss {loss} / accuracy {accuracy}\")\n",
    "    return loss, {\"accuracy\": accuracy}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We create a `FedAvg` strategy and pass `evaluate_fn` to it. Then, we create a `ServerApp` that uses this strategy."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def server_fn(context: Context) -> ServerAppComponents:\n",
    "    # Create the FedAvg strategy\n",
    "    strategy = FedAvg(\n",
    "        fraction_fit=0.3,\n",
    "        fraction_evaluate=0.3,\n",
    "        min_fit_clients=3,\n",
    "        min_evaluate_clients=3,\n",
    "        min_available_clients=NUM_PARTITIONS,\n",
    "        initial_parameters=ndarrays_to_parameters(params),\n",
    "        evaluate_fn=evaluate,  # Pass the evaluation function\n",
    "    )\n",
    "    # Configure the server for 3 rounds of training\n",
    "    config = ServerConfig(num_rounds=3)\n",
    "    return ServerAppComponents(strategy=strategy, config=config)\n",
    "\n",
    "\n",
    "# Create the ServerApp\n",
    "server = ServerApp(server_fn=server_fn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, we run the simulation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Run simulation\n",
    "run_simulation(\n",
    "    server_app=server,\n",
    "    client_app=client,\n",
    "    num_supernodes=NUM_PARTITIONS,\n",
    "    backend_config=backend_config,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Sending/receiving arbitrary values to/from clients\n",
    "\n",
    "In some situations, we want to configure client-side execution (training, evaluation) from the server-side. One example for that is the server asking the clients to train for a certain number of local epochs. Flower provides a way to send configuration values from the server to the clients using a dictionary. Let's look at an example where the clients receive values from the server through the `config` parameter in `fit` (`config` is also available in `evaluate`). The `fit` method receives the configuration dictionary through the `config` parameter and can then read values from this dictionary. In this example, it reads `server_round` and `local_epochs` and uses those values to improve the logging and configure the number of local training epochs:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class FlowerClient(NumPyClient):\n",
    "    def __init__(self, pid, net, trainloader, valloader):\n",
    "        self.pid = pid  # partition ID of a client\n",
    "        self.net = net\n",
    "        self.trainloader = trainloader\n",
    "        self.valloader = valloader\n",
    "\n",
    "    def get_parameters(self, config):\n",
    "        print(f\"[Client {self.pid}] get_parameters\")\n",
    "        return get_parameters(self.net)\n",
    "\n",
    "    def fit(self, parameters, config):\n",
    "        # Read values from config\n",
    "        server_round = config[\"server_round\"]\n",
    "        local_epochs = config[\"local_epochs\"]\n",
    "\n",
    "        # Use values provided by the config\n",
    "        print(f\"[Client {self.pid}, round {server_round}] fit, config: {config}\")\n",
    "        set_parameters(self.net, parameters)\n",
    "        train(self.net, self.trainloader, epochs=local_epochs)\n",
    "        return get_parameters(self.net), len(self.trainloader), {}\n",
    "\n",
    "    def evaluate(self, parameters, config):\n",
    "        print(f\"[Client {self.pid}] evaluate, config: {config}\")\n",
    "        set_parameters(self.net, parameters)\n",
    "        loss, accuracy = test(self.net, self.valloader)\n",
    "        return float(loss), len(self.valloader), {\"accuracy\": float(accuracy)}\n",
    "\n",
    "\n",
    "def client_fn(context: Context) -> Client:\n",
    "    net = Net().to(DEVICE)\n",
    "    partition_id = context.node_config[\"partition-id\"]\n",
    "    num_partitions = context.node_config[\"num-partitions\"]\n",
    "    trainloader, valloader, _ = load_datasets(partition_id, num_partitions)\n",
    "    return FlowerClient(partition_id, net, trainloader, valloader).to_client()\n",
    "\n",
    "\n",
    "# Create the ClientApp\n",
    "client = ClientApp(client_fn=client_fn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "So how can we  send this config dictionary from server to clients? The built-in Flower Strategies provide way to do this, and it works similarly to the way server-side evaluation works. We provide a function to the strategy, and the strategy calls this function for every round of federated learning:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def fit_config(server_round: int):\n",
    "    \"\"\"Return training configuration dict for each round.\n",
    "\n",
    "    Perform two rounds of training with one local epoch, increase to two local\n",
    "    epochs afterwards.\n",
    "    \"\"\"\n",
    "    config = {\n",
    "        \"server_round\": server_round,  # The current round of federated learning\n",
    "        \"local_epochs\": 1 if server_round < 2 else 2,\n",
    "    }\n",
    "    return config"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we'll pass this function to the FedAvg strategy before starting the simulation:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def server_fn(context: Context) -> ServerAppComponents:\n",
    "    # Create FedAvg strategy\n",
    "    strategy = FedAvg(\n",
    "        fraction_fit=0.3,\n",
    "        fraction_evaluate=0.3,\n",
    "        min_fit_clients=3,\n",
    "        min_evaluate_clients=3,\n",
    "        min_available_clients=NUM_PARTITIONS,\n",
    "        initial_parameters=ndarrays_to_parameters(params),\n",
    "        evaluate_fn=evaluate,\n",
    "        on_fit_config_fn=fit_config,  # Pass the fit_config function\n",
    "    )\n",
    "    config = ServerConfig(num_rounds=3)\n",
    "    return ServerAppComponents(strategy=strategy, config=config)\n",
    "\n",
    "\n",
    "# Create the ServerApp\n",
    "server = ServerApp(server_fn=server_fn)\n",
    "\n",
    "# Run simulation\n",
    "run_simulation(\n",
    "    server_app=server,\n",
    "    client_app=client,\n",
    "    num_supernodes=NUM_PARTITIONS,\n",
    "    backend_config=backend_config,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As we can see, the client logs now include the current round of federated learning (which they read from the `config` dictionary). We can also configure local training to run for one epoch during the first and second round of federated learning, and then for two epochs during the third round.\n",
    "\n",
    "Clients can also return arbitrary values to the server. To do so, they return a dictionary from `fit` and/or `evaluate`. We have seen and used this concept throughout this notebook without mentioning it explicitly: our `FlowerClient` returns a dictionary containing a custom key/value pair as the third return value in `evaluate`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Scaling federated learning\n",
    "\n",
    "As a last step in this notebook, let's see how we can use Flower to experiment with a large number of clients."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "NUM_PARTITIONS = 1000"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that we can reuse the `ClientApp` for different `num-partitions` since the Context is defined by the `num_supernodes` argument in `run_simulation()`. \n",
    "\n",
    "We now have 1000 partitions, each holding 45 training and 5 validation examples. Given that the number of training examples on each client is quite small, we should probably train the model a bit longer, so we configure the clients to perform 3 local training epochs. We should also adjust the fraction of clients selected for training during each round (we don't want all 1000 clients participating in every round), so we adjust `fraction_fit` to `0.025`, which means that only 2.5% of available clients (so 25 clients) will be selected for training each round:\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def fit_config(server_round: int):\n",
    "    config = {\n",
    "        \"server_round\": server_round,\n",
    "        \"local_epochs\": 3,\n",
    "    }\n",
    "    return config\n",
    "\n",
    "\n",
    "def server_fn(context: Context) -> ServerAppComponents:\n",
    "    # Create FedAvg strategy\n",
    "    strategy = FedAvg(\n",
    "        fraction_fit=0.025,  # Train on 25 clients (each round)\n",
    "        fraction_evaluate=0.05,  # Evaluate on 50 clients (each round)\n",
    "        min_fit_clients=20,\n",
    "        min_evaluate_clients=40,\n",
    "        min_available_clients=NUM_PARTITIONS,\n",
    "        initial_parameters=ndarrays_to_parameters(params),\n",
    "        on_fit_config_fn=fit_config,\n",
    "    )\n",
    "    config = ServerConfig(num_rounds=3)\n",
    "    return ServerAppComponents(strategy=strategy, config=config)\n",
    "\n",
    "\n",
    "# Create the ServerApp\n",
    "server = ServerApp(server_fn=server_fn)\n",
    "\n",
    "# Run simulation\n",
    "run_simulation(\n",
    "    server_app=server,\n",
    "    client_app=client,\n",
    "    num_supernodes=NUM_PARTITIONS,\n",
    "    backend_config=backend_config,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Recap\n",
    "\n",
    "In this notebook, we've seen how we can gradually enhance our system by customizing the strategy, initializing parameters on the server side, choosing a different strategy, and evaluating models on the server-side. That's quite a bit of flexibility with so little code, right?\n",
    "\n",
    "In the later sections, we've seen how we can communicate arbitrary values between server and clients to fully customize client-side execution. With that capability, we built a large-scale Federated Learning simulation using the Flower Virtual Client Engine and ran an experiment involving 1000 clients in the same workload - all in a Jupyter Notebook!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Next steps\n",
    "\n",
    "Before you continue, make sure to join the Flower community on Flower Discuss ([Join Flower Discuss](https://discuss.flower.ai)) and on Slack ([Join Slack](https://flower.ai/join-slack/)).\n",
    "\n",
    "There's a dedicated `#questions` channel if you need help, but we'd also love to hear who you are in `#introductions`!\n",
    "\n",
    "The [Flower Federated Learning Tutorial - Part 3](https://flower.ai/docs/framework/tutorial-build-a-strategy-from-scratch-pytorch.html) shows how to build a fully custom `Strategy` from scratch."
   ]
  }
 ],
 "metadata": {
  "colab": {
   "name": "Flower-2-Strategies-in-FL-PyTorch.ipynb",
   "provenance": [],
   "toc_visible": true
  },
  "kernelspec": {
   "display_name": "Python 3.7.12 64-bit ('flower-3.7.12')",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
